package com.alinesno.cloud.base.message.service.impl;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import javax.transaction.Transactional;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import com.alinesno.cloud.base.message.entity.TransactionMessageEntity;
import com.alinesno.cloud.base.message.entity.TransactionMessageHistoryEntity;
import com.alinesno.cloud.base.message.enums.MessageDataTypeEnum;
import com.alinesno.cloud.base.message.enums.MessageDeathEnum;
import com.alinesno.cloud.base.message.enums.MessageStatusEnum;
import com.alinesno.cloud.base.message.kafka.KafkaMessageProducer;
import com.alinesno.cloud.base.message.repository.TransactionMessageRepository;
import com.alinesno.cloud.base.message.service.ITransactionMessageHistoryService;
import com.alinesno.cloud.base.message.service.ITransactionMessageService;
import com.alinesno.cloud.common.core.services.impl.IBaseServiceImpl;
import com.alinesno.cloud.common.facade.wrapper.RestWrapper;

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author LuoAnDong
 * @since 2018-12-02 15:19:37
 */
@Service
public class TransactionMessageServiceImpl
		extends IBaseServiceImpl<TransactionMessageRepository, TransactionMessageEntity, String>
		implements ITransactionMessageService {

	// 日志记录
	private static final Logger log = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);

	@Autowired
	private KafkaMessageProducer kafkaMessageProducer;

	@Autowired
	private TransactionMessageRepository transactionMessageRepository;

	@Autowired
	private ITransactionMessageHistoryService transactionMessageHistoryService;

	@Override
	public String saveMessageLocal(TransactionMessageEntity e) {

		Assert.notNull(e, "消息实体不能为空.");
		Assert.hasLength(e.getBusinessBody(), "消息内容不能为空.");
		Assert.hasLength(e.getTopic(), "消息主题不能为空.");
		Assert.hasLength(e.getBusinessId(), "消息业务ID字段不能为空.");

		e.setBusinessDateType(MessageDataTypeEnum.JSON.value());
		e.setMessageSendTimes(0);
		e.setMessageStatus(MessageStatusEnum.PREPARED.value());
		e.setVersions(e.getVersions() == 0 ? 1 : e.getVersions());

		return this.save(e).getId();
	}

	@Transactional
	@Override
	public boolean confirmAndSendMessage(String id) {

		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageEntity> ep = findById(id);
		if (ep.isPresent()) {

			TransactionMessageEntity e = ep.get(); // 消息状态修改为发送中
			e.setMessageStatus(MessageStatusEnum.SENDING.value());
			e.setUpdateTime(new Date());
			this.save(e);

			kafkaMessageProducer.sendMssage(e.getTopic(), e.getBusinessBody());
			return true;
		}
		return false;
	}

	@Override
	public boolean saveAndSendMessage(TransactionMessageEntity e) {
		String messageId = this.saveMessageLocal(e);
		return this.confirmAndSendMessage(messageId);
	}

	@Override
	public boolean directSendMessage(TransactionMessageEntity e) {

		Assert.notNull(e, "消息实体不能为空.");
		Assert.hasLength(e.getBusinessBody(), "消息内容不能为空.");
		Assert.hasLength(e.getTopic(), "消息主题不能为空.");

		kafkaMessageProducer.sendMssage(e.getTopic(), e.getBusinessBody());
		return false;
	}

	@Transactional
	@Override
	public boolean reSendMessageById(String id) {
		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageEntity> ep = findById(id);
		if (ep.isPresent()) {
			TransactionMessageEntity e = ep.get(); // 修改消息为死亡

			e.setUpdateTime(new Date());
			e.setMessageStatus(MessageStatusEnum.SENDING.value());
			e.setMessageSendTimes(e.getMessageSendTimes()==null?1:(e.getMessageSendTimes()+1));
			this.save(e);
			
			return true ; 
		}
		return false;
	}

	@Override
	public boolean setMessageToAreadlyDead(String id) {
		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageEntity> ep = findById(id);
		if (ep.isPresent()) {

			TransactionMessageEntity e = ep.get(); // 修改消息为死亡

			e.setAreadlyDead(MessageDeathEnum.HAS_DEAD.value());
			e.setDeleteTime(new Date());
			e.setMessageStatus(MessageStatusEnum.SENDING.value());

			this.save(e);
			return true ; 
		}

		return false;
	}

	@Override
	public TransactionMessageEntity getMessageById(String id) {
		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageEntity> ep = findById(id);
		if (ep.isPresent()) {
			return ep.get();
		}

		return null;
	}

	@Override
	public TransactionMessageEntity getDeathMessageById(String id) {
		TransactionMessageEntity message = transactionMessageRepository.findByIdAndAreadlyDead(id,
				MessageDeathEnum.HAS_DEAD.value());
		return message;
	}

	@Override
	public TransactionMessageHistoryEntity getHistoryMessageById(String id) {

		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageHistoryEntity> ep = transactionMessageHistoryService.findById(id);
		if (ep.isPresent()) {
			return ep.get();
		}

		return null;
	}


	@Override
	public boolean deleteMessageById(String id) {
		Assert.hasLength(id, "消息主键不能为空.");
		try {
			this.deleteById(id);
			return true;
		} catch (Exception e) {
			log.error("消息删除失败:{}", e);
		}
		return false;
	}

	@Override
	public boolean confirmMessageByMessageId(String id) {
		Assert.hasLength(id, "消息主键不能为空.");

		Optional<TransactionMessageEntity> ep = findById(id);
		if (ep.isPresent()) {
			TransactionMessageEntity e = ep.get(); // 修改消息为可发送
			e.setMessageStatus(MessageStatusEnum.ENABLE.value());
			this.save(e);
			return true;
		}
		return false;
	}

	@Override
	public List<String> reSendAllMessageByTopic(String topic , int batchSize) {
		Assert.hasLength(topic , "消息主题不能为空.");
	
		Page<TransactionMessageEntity>  page = this.getMessageByTopic(topic, 0 ,batchSize ) ; 
		for(TransactionMessageEntity e : page.getContent()) {
			this.reSendMessageById(e.getId()) ; 
		}
		
		return null ;
	}

	@Override
	public Page<TransactionMessageEntity> listMessagePage(int pageNow, int pageSize, Map<String, Object> params) {
		Pageable pageable = PageRequest.of(pageNow, pageSize) ;
	
		RestWrapper wrapper = new RestWrapper() ; 
		wrapper.allEq(params) ; 
		
		Page<TransactionMessageEntity> page = this.findAll(wrapper.toSpecification() , pageable) ; 
		return page ;
	}

	@Override
	public boolean sendMessage(String messageBody, String topicName) {

		Assert.hasLength(messageBody, "消息内容不能为空.");
		Assert.hasLength(topicName, "消息主题不能为空.");

		kafkaMessageProducer.sendMssage(topicName, messageBody);
		return true;
	}

	@Override
	public Page<TransactionMessageEntity> getMessageByTopic(String topic , int pageNow, int pageSize) {
		Map<String , Object> params = new HashMap<String , Object>() ; 
		if(StringUtils.isNotBlank(topic)) {
			params.put("topic", topic) ; 
		}
		return this.listMessagePage(pageNow, pageSize, params) ; 
	}

	@Override
	public Page<TransactionMessageEntity> getMessageByTopicAndParam(String topic, Map<String, Object> params, int pageNow, int pageSize) {
		if(StringUtils.isNotBlank(topic)) {
			params.put("topic", topic) ; 
		}
		return this.listMessagePage(pageNow, pageSize, params) ; 
	}

}
